//! Duplicate detection job implementation

use crate::{
	domain::content_identity::ContentHashGenerator,
	infra::job::prelude::*,
	domain::addressing::{SdPath, SdPathBatch},
};
use serde::{Deserialize, Serialize};
use std::{
	collections::{HashMap, HashSet},
	path::PathBuf,
	time::{Duration, Instant},
};
use tokio::fs;
use uuid::Uuid;

/// Duplicate detection modes
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DetectionMode {
	/// Compare only file sizes
	SizeOnly,
	/// Compare file sizes and CAS IDs (content hash)
	ContentHash,
	/// Compare file names and sizes
	NameAndSize,
	/// Deep comparison with full content verification
	DeepScan,
}

/// Duplicate detection job for finding duplicate files
#[derive(Debug, Serialize, Deserialize)]
pub struct DuplicateDetectionJob {
	pub search_paths: SdPathBatch,
	pub mode: DetectionMode,
	pub min_file_size: u64,
	pub max_file_size: Option<u64>,
	pub file_extensions: Option<HashSet<String>>,

	// Internal state for resumption
	#[serde(skip)]
	processed_files: HashSet<PathBuf>,
	#[serde(skip)]
	size_groups: HashMap<u64, Vec<FileInfo>>,
	#[serde(skip, default = "Instant::now")]
	started_at: Instant,
}

/// File information for duplicate detection
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileInfo {
	pub path: SdPath,
	pub size: u64,
	pub content_hash: Option<String>,
	pub modified: Option<std::time::SystemTime>,
}

/// Duplicate group containing files with same content
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DuplicateGroup {
	pub files: Vec<FileInfo>,
	pub total_size: u64,
	pub wasted_space: u64, // Size that could be saved by keeping only one copy
	pub detection_method: String,
}

/// Duplicate detection progress information
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DuplicateProgress {
	pub current_file: String,
	pub files_scanned: usize,
	pub total_files: usize,
	pub duplicates_found: usize,
	pub potential_savings: u64,
	pub current_operation: String,
}

impl JobProgress for DuplicateProgress {}

impl Job for DuplicateDetectionJob {
	const NAME: &'static str = "duplicate_detection";
	const RESUMABLE: bool = true;
	const DESCRIPTION: Option<&'static str> = Some("Find duplicate files");
}

impl crate::infra::job::traits::DynJob for DuplicateDetectionJob {
	fn job_name(&self) -> &'static str {
		Self::NAME
	}

	// DuplicateDetectionJob doesn't track specific entry resources, so use default None
}

#[async_trait::async_trait]
impl JobHandler for DuplicateDetectionJob {
	type Output = DuplicateDetectionOutput;

	async fn run(&mut self, ctx: JobContext<'_>) -> JobResult<Self::Output> {
		ctx.log(format!(
			"Starting duplicate detection with mode: {:?}",
			self.mode
		));

		// Collect all files to scan
		let files_to_scan = self.collect_files(&ctx).await?;
		let total_files = files_to_scan.len();

		ctx.log(format!(
			"Found {} files to scan for duplicates",
			total_files
		));

		// Phase 1: Group by size
		self.group_by_size(&files_to_scan, &ctx, total_files)
			.await?;

		// Phase 2: Further analysis based on mode
		let duplicate_groups = match self.mode {
			DetectionMode::SizeOnly => self.find_size_duplicates(&ctx).await?,
			DetectionMode::ContentHash => self.find_content_duplicates(&ctx).await?,
			DetectionMode::NameAndSize => self.find_name_size_duplicates(&ctx).await?,
			DetectionMode::DeepScan => self.find_deep_scan_duplicates(&ctx).await?,
		};

		let total_duplicates = duplicate_groups.iter().map(|g| g.files.len() - 1).sum();
		let potential_savings: u64 = duplicate_groups.iter().map(|g| g.wasted_space).sum();

		ctx.log(format!("Duplicate detection completed: {} groups found, {} duplicates, {} bytes potential savings",
            duplicate_groups.len(), total_duplicates, potential_savings));

		Ok(DuplicateDetectionOutput {
			duplicate_groups,
			total_files_scanned: total_files,
			total_duplicates,
			potential_savings,
			duration: self.started_at.elapsed(),
			detection_mode: self.mode.clone(),
		})
	}
}

impl DuplicateDetectionJob {
	/// Create a new duplicate detection job
	pub fn new(search_paths: SdPathBatch, mode: DetectionMode) -> Self {
		Self {
			search_paths,
			mode,
			min_file_size: 1024, // 1KB minimum
			max_file_size: None,
			file_extensions: None,
			processed_files: HashSet::new(),
			size_groups: HashMap::new(),
			started_at: Instant::now(),
		}
	}

	/// Set minimum file size filter
	pub fn with_min_size(mut self, min_size: u64) -> Self {
		self.min_file_size = min_size;
		self
	}

	/// Set maximum file size filter
	pub fn with_max_size(mut self, max_size: u64) -> Self {
		self.max_file_size = Some(max_size);
		self
	}

	/// Set file extension filter
	pub fn with_extensions(mut self, extensions: Vec<String>) -> Self {
		self.file_extensions = Some(extensions.into_iter().collect());
		self
	}

	/// Collect all files to scan
	async fn collect_files(&self, ctx: &JobContext<'_>) -> JobResult<Vec<FileInfo>> {
		let mut files = Vec::new();

		for search_path in &self.search_paths.paths {
			ctx.check_interrupt().await?;

			if let Some(local_path) = search_path.as_local_path() {
				self.collect_files_recursive(local_path, search_path, &mut files, ctx)
					.await?;
			}
		}

		Ok(files)
	}

	/// Collect files from a directory using iterative approach
	async fn collect_files_recursive(
		&self,
		path: &std::path::Path,
		sd_path: &SdPath,
		files: &mut Vec<FileInfo>,
		ctx: &JobContext<'_>,
	) -> JobResult<()> {
		let mut stack = vec![(path.to_path_buf(), sd_path.clone())];

		while let Some((current_path, current_sd_path)) = stack.pop() {
			ctx.check_interrupt().await?;

			let metadata = fs::metadata(&current_path).await?;

			if metadata.is_file() {
				if self.should_include_file(&current_path, metadata.len()) {
					files.push(FileInfo {
						path: current_sd_path,
						size: metadata.len(),
						content_hash: None,
						modified: metadata.modified().ok(),
					});
				}
			} else if metadata.is_dir() {
				let mut dir = fs::read_dir(&current_path).await?;

				while let Some(entry) = dir.next_entry().await? {
					let entry_path = entry.path();
					let entry_sd_path = current_sd_path.join(entry.file_name());
					stack.push((entry_path, entry_sd_path));
				}
			}
		}

		Ok(())
	}

	/// Check if a file should be included based on filters
	fn should_include_file(&self, path: &std::path::Path, size: u64) -> bool {
		// Size filters
		if size < self.min_file_size {
			return false;
		}

		if let Some(max_size) = self.max_file_size {
			if size > max_size {
				return false;
			}
		}

		// Extension filter
		if let Some(ref extensions) = self.file_extensions {
			if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
				if !extensions.contains(&ext.to_lowercase()) {
					return false;
				}
			} else {
				// No extension - exclude if we have extension filter
				return false;
			}
		}

		true
	}

	/// Group files by size for initial duplicate detection
	async fn group_by_size(
		&mut self,
		files: &[FileInfo],
		ctx: &JobContext<'_>,
		total_files: usize,
	) -> JobResult<()> {
		for (index, file) in files.iter().enumerate() {
			ctx.check_interrupt().await?;

			ctx.progress(Progress::structured(DuplicateProgress {
				current_file: file.path.display(),
				files_scanned: index + 1,
				total_files,
				duplicates_found: 0,
				potential_savings: 0,
				current_operation: "Grouping by size".to_string(),
			}));

			self.size_groups
				.entry(file.size)
				.or_insert_with(Vec::new)
				.push(file.clone());

			// Checkpoint every 100 files
			if index % 100 == 0 {
				ctx.checkpoint().await?;
			}
		}

		Ok(())
	}

	/// Find duplicates based on size only
	async fn find_size_duplicates(&self, ctx: &JobContext<'_>) -> JobResult<Vec<DuplicateGroup>> {
		let mut groups = Vec::new();

		for (size, files) in &self.size_groups {
			if files.len() > 1 {
				let wasted_space = *size * (files.len() as u64 - 1);
				groups.push(DuplicateGroup {
					files: files.clone(),
					total_size: *size * files.len() as u64,
					wasted_space,
					detection_method: "Size comparison".to_string(),
				});
			}
		}

		Ok(groups)
	}

	/// Find duplicates based on content hash
	async fn find_content_duplicates(
		&self,
		ctx: &JobContext<'_>,
	) -> JobResult<Vec<DuplicateGroup>> {
		let mut groups = Vec::new();
		let mut processed = 0;
		let total_candidates: usize = self
			.size_groups
			.values()
			.map(|files| if files.len() > 1 { files.len() } else { 0 })
			.sum();

		for (size, files) in &self.size_groups {
			if files.len() > 1 {
				ctx.check_interrupt().await?;

				// Generate content hashes for files with same size
				let mut hash_groups: HashMap<String, Vec<FileInfo>> = HashMap::new();

				for file in files {
					ctx.progress(Progress::structured(DuplicateProgress {
						current_file: file.path.display(),
						files_scanned: processed + 1,
						total_files: total_candidates,
						duplicates_found: groups.len(),
						potential_savings: groups
							.iter()
							.map(|g: &DuplicateGroup| g.wasted_space)
							.sum(),
						current_operation: "Computing content hashes".to_string(),
					}));

					if let Some(local_path) = file.path.as_local_path() {
						match ContentHashGenerator::generate_content_hash(local_path).await {
							Ok(content_hash) => {
								let mut file_with_cas = file.clone();
								file_with_cas.content_hash = Some(content_hash.clone());
								hash_groups
									.entry(content_hash)
									.or_insert_with(Vec::new)
									.push(file_with_cas);
							}
							Err(e) => {
								ctx.add_non_critical_error(format!(
									"Failed to generate CAS ID for {}: {}",
									file.path.display(),
									e
								));
							}
						}
					}

					processed += 1;
				}

				// Create groups for files with same hash
				for (hash, hash_files) in hash_groups {
					if hash_files.len() > 1 {
						let wasted_space = *size * (hash_files.len() as u64 - 1);
						groups.push(DuplicateGroup {
							files: hash_files,
							total_size: *size * files.len() as u64,
							wasted_space,
							detection_method: format!("Content hash: {}", &hash[..8]),
						});
					}
				}
			}
		}

		Ok(groups)
	}

	/// Find duplicates based on name and size
	async fn find_name_size_duplicates(
		&self,
		ctx: &JobContext<'_>,
	) -> JobResult<Vec<DuplicateGroup>> {
		let mut groups = Vec::new();
		let mut name_size_groups: HashMap<(String, u64), Vec<FileInfo>> = HashMap::new();

		for (size, files) in &self.size_groups {
			if files.len() > 1 {
				for file in files {
					if let Some(filename) = file.path.file_name() {
						let key = (filename.to_string(), *size);
						name_size_groups
							.entry(key)
							.or_insert_with(Vec::new)
							.push(file.clone());
					}
				}
			}
		}

		for ((name, size), files) in name_size_groups {
			if files.len() > 1 {
				let file_count = files.len() as u64;
				let wasted_space = size * (file_count - 1);
				groups.push(DuplicateGroup {
					files,
					total_size: size * file_count,
					wasted_space,
					detection_method: format!("Name + size: {}", name),
				});
			}
		}

		Ok(groups)
	}

	/// Find duplicates with deep scanning (byte-by-byte comparison)
	async fn find_deep_scan_duplicates(
		&self,
		ctx: &JobContext<'_>,
	) -> JobResult<Vec<DuplicateGroup>> {
		// For deep scan, we first use content hash, then verify with byte comparison
		let mut hash_groups = self.find_content_duplicates(ctx).await?;

		// Additional verification for critical duplicates could go here
		// For now, content hash is sufficient for deep scanning

		for group in &mut hash_groups {
			group.detection_method = "Deep scan with content verification".to_string();
		}

		Ok(hash_groups)
	}
}

/// Job output for duplicate detection
#[derive(Debug, Serialize, Deserialize)]
pub struct DuplicateDetectionOutput {
	pub duplicate_groups: Vec<DuplicateGroup>,
	pub total_files_scanned: usize,
	pub total_duplicates: usize,
	pub potential_savings: u64,
	pub duration: Duration,
	pub detection_mode: DetectionMode,
}

impl From<DuplicateDetectionOutput> for JobOutput {
	fn from(output: DuplicateDetectionOutput) -> Self {
		JobOutput::DuplicateDetection {
			duplicate_groups: output.duplicate_groups.len(),
			total_duplicates: output.total_duplicates,
			potential_savings: output.potential_savings,
		}
	}
}
